2 min read

JS stdout大数据量连续写入管道导致堵塞问题的解决方案

一个大数据管理方面的经验,学到了计算机底层的buffer相关知识

问题背景:在一次将客户服务器的 SAP hana 数据库的数据通过 ETL 工具转移到 clickhouse 数仓时,发现本应是 600 多万行的数据,只有十几万到了数仓,且后台和数据库日志中没有任何报错信息。

目前项目中的 etl 工具是singer.io。框架是 tap 和 target,分别代表数据源和数据目标池。在这个例子中,tap 是一个自定义的 nodejs 脚本,实现从客户服务器的 hana 数据库获得数据,通过 linux 的管道传到 target(clickhouse)。管道的输入端就是 stdout。

如果将 etl 抽取的数据范围缩小,比如limit 100000, 最终落到 ck 中的数据是正常的。说明,数据量是问题的关键。多测试几次发现,数据量可能会在一个 160 多万的数字上下飘动。继续实验,我们模拟一个比较长的 js 对象字符串,console.log 打印出来,并把打印输出通过管道送到一个 csv 中,重复 600 万次。代码如下:

for (let i = 0; i < 6000000; i++) {
  console.log(JSON.stringify(bigObject));
}

然后执行命令:node xx.js | data.txt。检查data.txt行数,果然少于 600 万行。这里可以推断出来,是 js 脚本把 buffer 资源都吃光了,并在 buffer 已满时直接退出,且不返回任何报错或警告。于是,我们经过查阅资料,找到了用process.stdout.write()方法来替代console.log。因为process.stdout.write()可以接收一个回调函数作为参数,作为数据确认输出后的处理函数。所以,我们把上述代码改造为:

const writeAsync = (line) => {
  return new Promise((resolve, reject) => {
    process.stdout.write(`${line}\n`, () => {
      resolve("ok");
    });
  });
};

(async () => {
  for (let i = 0; i < 6000000; i++) {
    await writeAsync(JSON.stringify(bigObject));
  }
})();

用异步的逻辑,保证上一条数据成功输出后,才执行下一次输出。如果不成功,就会一直等待,直到 buffer 有新的剩余空间。这次执行完,data.txt行数果然来到了 600 万,这说明我们的推断是正确的。然后,给 hana 数据库抽取脚本改造成上述逻辑,核心部分代码:

client.execute(sql, (err, rs) => {
  if (err) {
    client.end();
    throw new Error(err);
  }

  rs.setFetchSize(2048);
  rs.createObjectStream()
    .on("data", async (data) => {
      const recordMessage = {
        type: "RECORD",
        stream: schemaID,
        record: data,
      };
      await writeAsync(JSON.stringify(recordMessage));
    })
    .on("finish", function () {
      if (!rs.closed) {
        rs.close();
      }
      client.end(() => {
        process.exit(0); // 确保退出
      });
    });
});

采用流的传输方式,避免数据堆积在有限的内存中。


Jason Lee
Hi, I'm Jason Lee

I hope I can still be curious about the world when I turn 60 years old.

Enjoy!

Contact me:

Email | GitHub


Content licenced under CC BY-NC-ND 4.0